change buffer
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 27 Sep 2018 13:07:17 +0000 (15:07 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 27 Sep 2018 13:07:17 +0000 (15:07 +0200)
include/siri/db/buffer.h
itest/test_insert.py
src/siri/db/buffer.c
src/siri/db/series.c

index 50c30baa08d5c763e5a05d0b72ba4dd0f6b69a65..c5e60a787aec0c120be1c5debe4db848fab1b13e 100644 (file)
@@ -27,16 +27,13 @@ int siridb_buffer_open(siridb_t * siridb);
 
 int siridb_buffer_load(siridb_t * siridb);
 
-int siridb_buffer_write_len(
+int siridb_buffer_write_empty(
         siridb_t * siridb,
         siridb_series_t * series);
 
-
-int siridb_buffer_write_point(
+int siridb_buffer_write_last_point(
         siridb_t * siridb,
-        siridb_series_t * series,
-        uint64_t * ts,
-        qp_via_t * val);
+        siridb_series_t * series);
 
 int siridb_buffer_fsync(siridb_t * siridb);
 
index c4eedeba5c6048e04c536401a7800c155afc1421..7956aaa32b88bf7e3114a5bd68c99e669d58c7c0 100644 (file)
@@ -24,7 +24,8 @@ TIME_PRECISION = 'ns'
 class TestInsert(TestBase):
     title = 'Test inserts and response'
 
-    GEN_POINTS = functools.partial(gen_points, n=1, time_precision=TIME_PRECISION)
+    GEN_POINTS = functools.partial(
+        gen_points, n=1, time_precision=TIME_PRECISION)
 
     async def _test_series(self, client):
 
@@ -34,20 +35,30 @@ class TestInsert(TestBase):
         result = await client.query('select * from "series int"')
         self.assertEqual(result['series int'], self.series_int)
 
-        result = await client.query('list series name, length, type, start, end')
+        result = await client.query(
+            'list series name, length, type, start, end')
         result['series'].sort()
         self.assertEqual(
             result,
             {   'columns': ['name', 'length', 'type', 'start', 'end'],
                 'series': [
-                    ['series float', 10000, 'float', self.series_float[0][0], self.series_float[-1][0]],
-                    ['series int', 10000, 'integer', self.series_int[0][0], self.series_int[-1][0]],
+                    [
+                        'series float',
+                        10000, 'float',
+                        self.series_float[0][0],
+                        self.series_float[-1][0]],
+                    [
+                        'series int', 10000,
+                        'integer',
+                        self.series_int[0][0],
+                        self.series_int[-1][0]],
                 ]
             })
 
     async def insert(self, client, series, n, timeout=1):
         for _ in range(n):
-            await client.insert_some_series(series, timeout=timeout, points=self.GEN_POINTS)
+            await client.insert_some_series(
+                series, timeout=timeout, points=self.GEN_POINTS)
             await asyncio.sleep(1.0)
 
     @default_test_setup(2, time_precision=TIME_PRECISION, compression=False)
@@ -62,9 +73,11 @@ class TestInsert(TestBase):
             await self.client0.insert([]),
             {'success_msg': 'Successfully inserted 0 point(s).'})
 
-        self.series_float = gen_points(tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+        self.series_float = gen_points(
+            tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
         random.shuffle(self.series_float)
-        self.series_int = gen_points(tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
+        self.series_int = gen_points(
+            tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m')
         random.shuffle(self.series_int)
 
         self.assertEqual(
@@ -96,6 +109,13 @@ class TestInsert(TestBase):
         with self.assertRaises(InsertError):
             await self.client0.insert({'no points': [[]]})
 
+        self.assertEqual(
+            await self.client0.insert({
+                'ts_zero': [[0, 1]]
+                }), {'success_msg': 'Successfully inserted 1 point(s).'})
+
+        await self.client0.query('drop series "ts_zero"')
+
         with self.assertRaises(InsertError):
             await self.client0.insert([{'name': 'no points', 'points': []}])
 
@@ -103,6 +123,11 @@ class TestInsert(TestBase):
         with self.assertRaises(InsertError):
             await self.client0.insert({'invalid ts': [[0.5, 6]]})
 
+        # timestamps should be interger values
+        with self.assertRaises(InsertError):
+            await self.client0.insert(
+                {'invalid ts': [[-1, 6]]})
+
         # empty series name is not allowed
         with self.assertRaises(InsertError):
             await self.client0.insert({'': [[1, 0]]})
index 3306f39f2114d147338ad919e6f0117419995131..5aac01dfcbbe12a2328af7c1c22f0d6e9de701bc 100644 (file)
@@ -19,6 +19,7 @@
 #include <string.h>
 #include <unistd.h>
 #include <xpath/xpath.h>
+#include <assert.h>
 
 #define SIRIDB_BUFFER_FN "buffer.dat"
 
 static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series);
 static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series);
 
+static const uint64_t BUFFER_end = 0xffffffffffffffff;
+
 
 /*
  * Returns 0 if success or EOF in case of an error.
  */
-int siridb_buffer_write_len(
+int siridb_buffer_write_empty(
         siridb_t * siridb,
         siridb_series_t * series)
 {
     return (
         /* go to the series position in buffer */
-        fseeko(  siridb->buffer_fp,
-                series->bf_offset + sizeof(uint32_t),
+        fseeko( siridb->buffer_fp,
+                series->bf_offset + 8,  // 4 bytes are unused
                 SEEK_SET) ||
 
-        /* write new length */
-        fwrite( &series->buffer->len,
-                sizeof(size_t),
+        /* write end ts */
+        fwrite( &BUFFER_end,
+                sizeof(uint64_t),
                 1,
                 siridb->buffer_fp) != 1) ? EOF : 0;
 }
@@ -55,25 +58,30 @@ int siridb_buffer_write_len(
  *
  * Returns 0 if success or EOF in case of an error.
  */
-int siridb_buffer_write_point(
+int siridb_buffer_write_last_point(
         siridb_t * siridb,
-        siridb_series_t * series,
-        uint64_t * ts,
-        qp_via_t * val)
+        siridb_series_t * series)
 {
-    const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t);
+    siridb_point_t * point;
+    const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t) + sizeof(uint64_t);
     char buf[sz];
+    int last_idx = series->buffer->len - 1;
+    assert (last_idx >= 0);
 
-    memcpy(buf, ts, sizeof(uint64_t));
-    memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t));
+    point = series->buffer->data + last_idx;
 
-    return (
-        siridb_buffer_write_len(siridb, series) ||
+    memcpy(buf, &point->ts, sizeof(uint64_t));
+    memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t));
+    memcpy(
+        buf + sizeof(uint64_t) + sizeof(qp_via_t),
+        &BUFFER_end,
+        sizeof(uint64_t));
 
+    return (
         /* jump to position where to write the new point */
-        fseeko(  siridb->buffer_fp,
-                16 * (series->buffer->len - 1),
-                SEEK_CUR) ||
+        fseeko( siridb->buffer_fp,
+                series->bf_offset + 8 + (16 * last_idx),
+                SEEK_SET) ||
 
         /* write time-stamp and value */
         fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0;
index 226fbc0247ec69bb9636a37aaea2efed2ce8d4df..2d8b36277b5d2b60c4ed5ae2a49b40c11ab37168 100644 (file)
@@ -145,7 +145,7 @@ int siridb_series_add_point(
         else
         {
             series->buffer->len = 0;
-            if (siridb_buffer_write_len(siridb, series))
+            if (siridb_buffer_write_empty(siridb, series))
             {
                 ERR_FILE
                 rc = -1;
@@ -154,7 +154,7 @@ int siridb_series_add_point(
     }
     else
     {
-        if (siridb_buffer_write_point(siridb, series, ts, val))
+        if (siridb_buffer_write_last_point(siridb, series))
         {
             ERR_FILE
             log_critical("Cannot write new point to buffer");
@@ -217,7 +217,7 @@ int siridb_series_add_pcache(
         }
 
         series->buffer->len = 0;
-        if (siridb_buffer_write_len(siridb, series))
+        if (siridb_buffer_write_empty(siridb, series))
         {
             ERR_FILE
             return -1;